package day02.source;

import beans.SensorReading;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;

/**
 * 实现自定义的sourceFunction
 *
 * @author lvbingbing
 * @date 2021-11-17 23:26
 */
public class MySensor implements SourceFunction<SensorReading> {
    // 定义一个标识位，用来控制数据的产生
    private boolean running = true;

    @Override
    public void run(SourceContext<SensorReading> ctx) throws Exception {
        // 定义一个随机数发生器
        SecureRandom random = SecureRandom.getInstanceStrong();

        // 设置10个传感器的初始温度
        Map<String, Double> sensorTemperatureMap = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            sensorTemperatureMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
        }

        while (running) {
            for (Map.Entry<String, Double> entry : sensorTemperatureMap.entrySet()) {
                String sensorId = entry.getKey();
                Double temperature = entry.getValue();
                // 在当前温度基础上随机波动
                Double newTemperature = temperature + random.nextGaussian();
                sensorTemperatureMap.put(sensorId, newTemperature);
                ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemperature));
            }
            // 控制输出频率
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
